We're doing regression!
We will use simple data structures and lots of visualization to give a clear picture of what is happening in each step. $\texttt{scikit-learn}$ will be used only when the implementation gets complicated.
This is basically a reproductions of Essentials of Statistical Learning (ESL), Chapter 6, first few sections.
Presume a relationship between input data X
and and target data Y
:
$Y = f(X) + \textrm{noise}$
The goal in regression is to calculate a function $\hat{f}(X)$ that is a good estimate of $f(X)$.
In [ ]:
import sys
import heapq
import sklearn
import matplotlib.pyplot as plt
import numpy as np
from scipy.stats import norm
%matplotlib inline
Start with some simple examples of data distributions.
Remember: all parameter choices matter! We'll study the effects of our choices later.
In [ ]:
n_pts = 70 # number of data points
X = np.linspace(-1,1,n_pts) # build some X data
In [ ]:
# size of noise
sigma = 1
# get the corresponding Y data for a perfectly linear relationship with Gaussian noise
Y = X + (np.random.randn(n_pts)*sigma)
_ = plt.scatter(X,Y)
Now let's consider a more complicated relationship between the variables.
In [ ]:
def f(X):
# cubic function of X
return -10*X*X*X - 2*X*X
Switch to the familiar notation of training and test samples.
Note: we will generate a single array of x-values, and draw test and training sets of y-values.
In [ ]:
X_train = X
Y_train = f(X) + (np.random.randn(n_pts)*sigma)
_ = plt.scatter(X_train,Y_train)
Let's see how well oridinary linear regression does.
In [ ]:
# http://jmduke.com/posts/basic-linear-regressions-in-python/
def basic_linear_regression(x, y):
"""
Use least-square minimization to compute the regression coefficients
for a 1-dim linear model.
parameters:
x: array of values for the independant (feature) variable
y: array of values for the dependaent (target) variable
return value:
2-tuple of slope and y-intercept
"""
# Basic computations to save a little time.
length = len(x)
sum_x = sum(x)
sum_y = sum(y)
# Σx^2, and Σxy respectively.
sum_x_squared = sum(map(lambda a: a * a, x))
sum_of_products = sum([x[i] * y[i] for i in range(length)])
# Magic formulae!
a = (sum_of_products - (sum_x * sum_y) / length) / (sum_x_squared - ((sum_x ** 2) / length))
b = (sum_y - a * sum_x) / length
return a, b
In [ ]:
B_1,B_0 = basic_linear_regression(X_train,Y_train)
Make a plotting function that make a scatter plot of the data overlaid with the estimated regression function, $\hat{f}$.
In [ ]:
def plot(X,Y,Y_hat):
"""
Plot data and estimated regression function
Parameters:
X: independant variable
Y: dependant variable
Y_hat: estimate of the dependant variable; f_hat(X)
"""
plt.scatter(X,Y,label='data')
plt.plot(X,Y_hat,label='estimate',color='g',linewidth=2)
plt.legend()
In [ ]:
Y_hat_train = X_train*B_1 + B_0
plot(X_train,Y_train,Y_hat_train)
How can we quantify the quality of the regression?
In [ ]:
def mse(Y_hat_train,Y_train,Y_test,print_results=True):
"""
Print mean squared error for test and train data
Parameters:
Y_hat_train: estimated y-values for the training set
Y_train: true y-values for the training set
Y_test: true y-values for an independant test set, based on the _same_ x-values as Y_train.
Return value:
tuple(training error, test error)
"""
train_err = np.mean([abs(yh-y)**2 for y,yh in zip(Y_train,Y_hat_train)])
test_err = np.mean([abs(yh-y)**2 for y,yh in zip(Y_test,Y_hat_train)])
if print_results:
print("train err: {0:.3f}".format(train_err))
print("test err: {0:.3f}".format(test_err))
else:
return train_err,test_err
In [ ]:
# draw a _test_ sample from f(X)
Y_test = f(X) + (np.random.randn(n_pts)*sigma)
In [ ]:
mse(Y_hat_train,Y_train,Y_test)
Remember how kNN works:
The value of the function $\hat{f}$ is calculated at every point $x_0$ in X and is given by the average of the $y$ values for the $k$ nearest neighbors in the training data.
$\hat{f}(x)=Average(y_i |~ x_i\in N_k(x))$,
where $N_k(x)$ is the set of $k$ nearest neighbor points to $x$.
In [ ]:
def kNN(X,Y,x_0,k=20):
"""
Simple 1-D implementation of kNN average.
Parameters:
X: the vector of feature data
x_0: a particular point in the feature space
k: number of nearest neighbors to include
Return value:
The estimated regression function.
For our purposes, think of a heapq object as a sorted list with many nice performance properties.
The first item is always the smallest. For items that are tuples, the default is to sort
by the first element in the tuple.
"""
nearest_neighbors = []
for x,y in zip(X,Y):
distance = abs(x-x_0)
heapq.heappush(nearest_neighbors,(distance,y))
return np.mean( [heapq.heappop(nearest_neighbors)[1] for _ in xrange(k)] )
In [ ]:
k = 15
Y_hat_train_knn = [kNN(X_train,Y_train,x_0,k=k) for x_0 in X_train]
In [ ]:
plot(X_train,Y_train,Y_hat_train_knn)
In [ ]:
mse(Y_hat_train_knn,Y_train,Y_test)
As $k\rightarrow 1$, the model exactly matches the training data, and the training error goes to zero. But the test error increases as the variance goes up.
The function $N_k(X)$ is a kernel function. It defines how the data points contribute to the calculation of the regression function, as a function of $X$. We can think of $N_k$ as assigning weights of $0$ or $1$ to every point in the training data, as a function of $X$.
We can generalize the kNN function above to calculate the weighted average of $Y_{train}$ at $X_0$, for an arbitrary kernel.
In [ ]:
def kernel_smoother(X,Y,x_0,kernel,width):
"""
Generalization of 1-D kNN average, with custom kernel.
Parameters:
X: the vector of feature data
x_0: a particular point in the feature space
kernel: kernel function
width: kernel width
Return value:
The estimated regression function at x_0.
"""
kernel_weights = [kernel(x_0,x,width) for x in X]
weighted_average = np.average(Y,weights=kernel_weights)
return weighted_average
In [ ]:
def epanechnikov_kernel(x_0,x,width):
"""
For a point x_0 in x, return the weight for the given width.
"""
def D(t):
if t <= 1:
#return 3/4*float(1-t*t) <== why doesn't this work?
return float(1-t*t)*3/4
else:
return 0
return D(abs(x-x_0)/width)
In [ ]:
# plot the Epanechnikov kernel at x_0 = 0, width = 1 to get a sense for it
Y = [epanechnikov_kernel(0,x,1) for x in X]
_ = plt.plot(X,Y)
Using the updateded kNN with an Epanechnikov kernel, make a better prediction function.
In [ ]:
width=0.35
Y_hat_train_epan_kernel = [kernel_smoother(X_train
,Y_train
,x_0
,kernel=epanechnikov_kernel
,width=width)
for x_0 in X_train]
In [ ]:
plot(X_train,Y_train,Y_hat_train_epan_kernel)
In [ ]:
mse(Y_hat_train_epan_kernel,Y_train,Y_test)
There are other kernels.
In [ ]:
def tri_cube_kernel(x_0,x,width):
def D(t):
if t <= 1:
return float(1-t*t*t)**3
else:
return 0
return D(abs(x-x_0)/width)
In [ ]:
# plot some kernels at x_0 = 0, width = 1 to get a sense for them
Y1 = [epanechnikov_kernel(0,x,1) for x in X]
Y2 = [tri_cube_kernel(0,x,1) for x in X]
Y3 = [norm.pdf(x) for x in X]
plt.plot(X,Y1,label="Epanechnikov")
plt.plot(X,Y2,color='g',label="tri-cube")
plt.plot(X,Y3,color='k',label="Gaussian")
plt.legend(loc='best')
In [ ]:
Y_hat_train_tri_kernel = [kernel_smoother(X_train
,Y_train
,x_0,kernel=tri_cube_kernel
,width=width)
for x_0 in X_train]
In [ ]:
plot(X_train,Y_train,Y_hat_train_tri_kernel)
In [ ]:
mse(Y_hat_train_tri_kernel,Y_train,Y_test)
Manage the bias at the boundary by replacing the weighted average with a weighted linear fit.
For each point $x_0$ in X:
In [ ]:
from sklearn.linear_model import LinearRegression
def linear_kernel_model(X,Y,x_0,kernel,width):
"""
1-D kernel-smoothed model with local linear regression.
Parameters:
X: the vector of feature data
x_0: a particular point in the feature space
kernel: kernel function
width: kernel width
Return value:
The estimated regression function at x_0.
"""
kernel_weights = [kernel(x_0,x,width) for x in X]
# the scikit-learn functions want something more numpy-like: an array of arrays
X = [[x] for x in X]
wls_model = LinearRegression()
wls_model.fit(X,Y,kernel_weights)
B_0 = wls_model.intercept_
B_1 = wls_model.coef_[0]
y_hat = B_0 + B_1*x_0
return y_hat
In [ ]:
Y_hat_train_linear_reg_epan_kernel = [
linear_kernel_model(X_train
,Y_train
,x_0
,kernel=epanechnikov_kernel
,width=width)
for x_0 in X_train]
In [ ]:
plot(X_train,Y_train,Y_hat_train_linear_reg_epan_kernel)
In [ ]:
mse(Y_hat_train_linear_reg_epan_kernel,Y_train,Y_test)
How can we optimize the value of meta-parameters like the kernel width?
Remember, the performance of many such parameters is correlated to variables like $\texttt{n}\_\texttt{pts}$.
In [ ]:
def test_width(X_train,Y_train,Y_test,values,model=linear_kernel_model,kernel=epanechnikov_kernel):
"""
Make a plot of the test and training mse as a function of some parameter
"""
train_errors = []
test_errors = []
for width in values:
Y_hat_train = [
model(X_train
,Y_train
,x_0,kernel=kernel
,width=width
)
for x_0 in X_train]
train_err,test_err = mse(Y_hat_train,Y_train,Y_test,print_results=False)
train_errors.append(train_err)
test_errors.append(test_err)
plt.plot(values,train_errors,'g.',ms=10,label="train error")
plt.plot(values,test_errors,'b.',ms=10,label="test error")
plt.legend(loc='best')
In [ ]:
widths = np.linspace(0.001,1,50) # 50 evenly-spaced width values
test_width(X_train,Y_train,Y_test,widths,model=linear_kernel_model,kernel=epanechnikov_kernel)
In [ ]:
width=0.2
In [ ]:
def f_2D(X):
return [(2*x[1]**3) + x[1]*x[0]*4 for x in X]
def f_3D(X):
return [(-2*x[0]*x[0]) + (2*x[1]**3) + x[2]*x[1]*x[0]*4 for x in X]
def f_nD_poly(X,n=2):
"""
Build a function that goes like x^n on each feature dimension x in X
"""
return [ sum([x**n for x in dim]) for dim in X]
In [ ]:
import random
n_pts = 50
X_train = np.array([[random.random(),random.random()] for _ in range(n_pts)])
In [ ]:
sigma = 0.1
Y_train = f_nD_poly(X_train,2) + ( np.random.randn(n_pts) * sigma )
Y_test = f_nD_poly(X_train,2) + ( np.random.randn(n_pts) * sigma )
In [ ]:
def kNN_multiD(X,Y,x_0,k=20,kernel_pars=None):
"""
Simple multi-dimensional implementation of kNN average.
Parameters:
X: the vector of feature data
x_0: a particular point in the feature space
k: number of nearest neighbors to include
Return value:
The estimated regression function at x_0.
Note: use numpy.linalg.norm for N-dim norms.
"""
nearest_neighbors = []
for x,y in zip(X,Y):
distance = np.linalg.norm(np.array(x)-np.array(x_0))
heapq.heappush(nearest_neighbors,(distance,y))
return np.mean( [heapq.heappop(nearest_neighbors)[1] for _ in xrange(k)] )
Remember that, because the dimensionality of the features is no long one, the scale of the error will be different. So don't compare the numbers below with those from the 1-D examples above.
In [ ]:
Y_hat_train = [
kNN_multiD(X_train,Y_train,x_0,k=k)
for x_0 in X_train]
mse(Y_hat_train,Y_train,Y_test)
In [ ]:
def epanechnikov_kernel_multiD(x_0,x,width=1):
def D(t):
#print("width = {}".format(width))
if t <= 1:
return float(1-t*t)*3/4
else:
return 0
return D(np.linalg.norm(np.array(x)-np.array(x_0))/width)
Let's also generalize the model so that the regression need not be simple and linear.
In [ ]:
def generalized_kernel_model(X,Y,x_0,kernel=epanechnikov_kernel_multiD,kernel_pars={},regressor=LinearRegression):
"""
Multi-D kernel-smoothed model with local generalized regression.
Parameters:
X: the vector of feature data
x_0: a particular point in the feature space
kernel: kernel function
width: kernel width
regressor: regression class - must follow scikit-learn API
Return value:
The estimated regression function at x_0.
"""
kernel_weights = [kernel(x_0,x,**kernel_pars) for x in X]
model = regressor()
model.fit(X,Y,sample_weight=kernel_weights)
return model.predict([x_0])[0]
In [ ]:
from sklearn.linear_model import Ridge,Lasso,ElasticNet
regressor=LinearRegression
width = 0.5
Y_hat_train = [
generalized_kernel_model(X_train
,Y_train
,x_0
,kernel=epanechnikov_kernel_multiD
,kernel_pars={"width":width}
,regressor=regressor)
for x_0 in X_train]
Compare the MSE here to that of the kNN above.
In [ ]:
mse(Y_hat_train,Y_train,Y_test)
In [ ]:
def test_parameter(parameter_name,values,args):
"""
Make a plot of the test and training mse as a function of some parameter
Parameters:
parameter name:
values:
args:
"""
train_errors = []
test_errors = []
# get the dictionary element and shortened name for the variable parameter
par_name = ""
X_train = np.array([[random.random() for _ in range(args['main']['n_dim'])] for _ in range(args['main']['n_pts'])])
Y_train = args['main']['func'](X_train) + ( np.random.randn(args['main']['n_pts']) * args['main']['sigma'] )
Y_test = args['main']['func'](X_train) + ( np.random.randn(args['main']['n_pts']) * args['main']['sigma'] )
for value in values:
# set the value of the variable parameter for this iteration
location = args
for key_num,key in enumerate(parameter_name.split(':')):
par_name = key
if key_num+1 < len(parameter_name.split(':')):
location = location[key]
else:
location[key] = value
Y_hat_train = [
args['main']['model_name'](X_train
,Y_train
,x_0
,kernel=args['main']['kernel']
,**args['model']
)
for x_0 in X_train]
train_err,test_err = mse(Y_hat_train,Y_train,Y_test,print_results=False)
train_errors.append(train_err)
test_errors.append(test_err)
plt.plot(values,train_errors,'g.',ms=15,label="train error")
plt.plot(values,test_errors,'b.',ms=15,label="test error")
plt.title(par_name)
plt.legend(loc='best')
In [ ]:
arguments = {
"main":{
"func":f_nD_poly,
"model_name":generalized_kernel_model,
"kernel":epanechnikov_kernel_multiD,
"n_pts":30,
"sigma":0.1,
"n_dim":2
},
"model":{
"regressor":LinearRegression,
"kernel_pars":{
"width":0.2
}
}
}
test_parameter("model:kernel_pars:width",np.linspace(0.01,1.5,30),arguments)
In [ ]: